
import java.time.{LocalDate, Period}
import java.util

import com.alibaba.fastjson.serializer.SerializeConfig
import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.gmall.realtime.bean.{OrderDetail, OrderInfo, OrderWide}
import com.atguigu.gmall.realtime.utils.{MyEsUtils, MyKafkaUtils, MyOffsetUtils, MyRedisUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable.ListBuffer

object DwdOrderApp {
  def main(args: Array[String]): Unit = {
    /**
     * 订单宽表
     * 1. 准备实时环境
     * 2. 从Redis中读取offset * 2
     * 3. 从kafka中消费数据 * 2
     * 4. 提取offset * 2
     * 5. 数据处理:
     *    5.1 转换结构
     *    5.2 维度关联
     *    5.3 双流Join
     * 6. 写入ES
     * 7. 提交Offset * 2
     *
     */
    val conf: SparkConf = new SparkConf().setAppName("orderTableWide").setMaster("local[4]")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))

    //2. 从Redis中读取offset
    val orderInfoTopic: String = "DWD_ORDER_INFO_0212_I"
    val orderDetailTopic: String = "DWD_ORDER_DETAIL_0212_I"
    val groupId: String = "DWD_ORDER_GROUP"

    val orderInfoOffset: Map[TopicPartition, Long] = MyOffsetUtils.ReadOffset(orderInfoTopic, groupId)
    val orderDetailTopicOffset: Map[TopicPartition, Long] = MyOffsetUtils.ReadOffset(orderDetailTopic, groupId)

    //3获取数据流
    var orderInfoDStream: InputDStream[ConsumerRecord[String, String]] = null
    if (orderInfoOffset.size > 0) {
      orderInfoDStream = MyKafkaUtils.GetConsumerDStream(ssc, orderInfoTopic, groupId)
    } else {
      orderInfoDStream = MyKafkaUtils.GetConsumerDStream(ssc, orderInfoTopic, groupId, orderInfoOffset)
    }
    var orderDetailDStream: InputDStream[ConsumerRecord[String, String]] = null
    if (orderDetailTopicOffset.size > 0) {
      orderDetailDStream = MyKafkaUtils.GetConsumerDStream(ssc, orderDetailTopic, groupId)
    } else {
      orderDetailDStream = MyKafkaUtils.GetConsumerDStream(ssc, orderDetailTopic, groupId, orderDetailTopicOffset)
    }
    //4提取offset
    var orderInfoOffsetRanges: Array[OffsetRange] = null
    val orderInfoOffsetDS: DStream[ConsumerRecord[String, String]] = orderInfoDStream.transform(
      rdd => {
        orderInfoOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    )
    var orderDetailOffsetRanges: Array[OffsetRange] = null
    val orderDetailOffsetDS: DStream[ConsumerRecord[String, String]] = orderDetailDStream.transform(
      rdd => {
        orderDetailOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    )
    //处理数据
    val orInfoObjectDStream: DStream[OrderInfo] = orderInfoOffsetDS.map(
      consumerCord => {
        val msg: String = consumerCord.value()
        val orderInfoObject: OrderInfo = JSON.parseObject(msg, classOf[OrderInfo])
        orderInfoObject
      }
    )
    val orDetailDStream: DStream[OrderDetail] = orderDetailOffsetDS.map(
      consumerCord => {
        val msg: String = consumerCord.value()
        val orderDetail: OrderDetail = JSON.parseObject(msg, classOf[OrderDetail])
        orderDetail
      }
    )
    //5.2维度关联
    val oderAllInfoDStream: DStream[OrderInfo] = orInfoObjectDStream.mapPartitions(
      orderInfoIter => {
        val list: List[OrderInfo] = orderInfoIter.toList
        val jedis: Jedis = MyRedisUtils.get()
        for (orderInfo <- list) {
          //用户维度补充
          val userId: Long = orderInfo.user_id
          val redisUserKey: String = s"DIM:USER_INFO:$userId"
          val userJson: String = jedis.get(redisUserKey)
          val userJsonObj: JSONObject = JSON.parseObject(userJson)
          //提取gender
          val userGender: String = userJsonObj.getString("gender")
          //提取生日
          val userBirthday: String = userJsonObj.getString("birthday")
          //换算年龄
          val birthdayLd: LocalDate = LocalDate.parse(userBirthday)
          val nowLd: LocalDate = LocalDate.now()
          val period: Period = Period.between(birthdayLd, nowLd)
          val userAge: Int = period.getYears

          orderInfo.user_gender = userGender
          orderInfo.user_age = userAge

          //补充地区维度
          val provinceId: Long = orderInfo.province_id
          val redisProvinceKey: String = s"DIM:BASE_PROVINCE:$provinceId"
          val provinceJson: String = jedis.get(redisProvinceKey)
          val provinceJsonObj: JSONObject = JSON.parseObject(provinceJson)

          orderInfo.province_name = provinceJsonObj.getString("name")
          orderInfo.province_area_code = provinceJsonObj.getString("area_code")
          orderInfo.province_3166_2_code = provinceJsonObj.getString("iso_3166_2")
          orderInfo.province_iso_code = provinceJsonObj.getString("iso_code")

          //处理日期字段
          val createTime: String = orderInfo.create_time
          val dtHrArr: Array[String] = createTime.split(" ")
          orderInfo.create_date = dtHrArr(0)
          orderInfo.create_hour = dtHrArr(1).split(":")(0)
        }
        MyRedisUtils.close(jedis)
        list.toIterator
      }

    )
    //5.3 双流Join
    //为什么调用不到join相关方法??
    // 要求流的结构是k-v
    //orderInfoDimDStream.join(orderDetailDStream)
    val orderInfoKVDStream: DStream[(Long, OrderInfo)] = oderAllInfoDStream.map(info => {
      (info.id, info)
    })
    val orderDetailKVDStream: DStream[(Long, OrderDetail)] = orDetailDStream.map(info => {
      (info.order_id, info)
    })
    //用哪种方式??
    // 内连接:
    //     join : 取两个表的交集
    // 外连接: 主(驱动)从(匹配)表
    //     leftOuterJoin : 左表都要，右表取匹配
    //     rightOuterJoin: 右表都要，左表取匹配
    //     fullOuterJoin : 左右表都要

    //从表的角度来说,  order_info 表和   order_detail表的数据都能join成功。 因此使用哪种join方式结果是一样的。

    //从流的角度来说, 只能将同一个批次中的数据进行join，不同批次的数据不能进行join. 如果两个流的数据出现了延迟，
    //正常能进行join的数据进入到了不同的批次，导致最终没有join成功.
    //我们可以接收因为数据延迟导致最终的结果也延迟.但是不能结构因为数据的延迟导致结果的不正确(丢失了数据)
    val orderFullJoinDStream: DStream[(Long, (Option[OrderInfo], Option[OrderDetail]))] = orderInfoKVDStream.fullOuterJoin(orderDetailKVDStream)
    val afterFullJoinDStream: DStream[OrderWide] = orderFullJoinDStream.mapPartitions(
      itertor => {
        val list: List[(Long, (Option[OrderInfo], Option[OrderDetail]))] = itertor.toList
        val jedis: Jedis = MyRedisUtils.get()
        val listBuffer: ListBuffer[OrderWide] = ListBuffer[OrderWide]()
        for ((id, (orderInfoOp, orderDetailOp)) <- list) {
          //orderInfo有
          if (orderInfoOp != None) {
            val orderInfo: OrderInfo = orderInfoOp.get
            if (orderDetailOp != None) {
              val orderDetail: OrderDetail = orderDetailOp.get
              val wide: OrderWide = new OrderWide(orderInfo, orderDetail)
              listBuffer.append(wide)
            }
            //1.2 orderInfo无条件进缓存 因为也可能订单详细表传不全，所以必须进缓存
            // type:   string
            // key:    ORDERJOIN:ORDER_INFO:[id]
            // value:  json字符串
            //写入API: set
            //读取API: get
            //是否过期: 24小时
            val orderInfoRedisKey: String = s"ORDERJOIN:ORDER_INFO:${orderInfo.id}"
            jedis.setex(orderInfoRedisKey, 3600 * 24, JSON.toJSONString(orderInfo, new SerializeConfig(true)))
            //1.3 orderInfo查缓存中的orderDetail
            val orderDetailRedisKey: String = s" ORDERJOIN:ORDER_DETAIL:${orderInfo.id}"
            val orderDetails: util.Set[String] = jedis.smembers(orderDetailRedisKey)
            if (orderDetails != null && orderDetails.size() > 0) {
              import scala.collection.JavaConverters._
              for (orderDetailJson <- orderDetails.asScala) {
                val orderDetail: OrderDetail = JSON.parseObject(orderDetailJson, classOf[OrderDetail])
                //封装成OrderWide
                val orderWide: OrderWide = new OrderWide(orderInfo, orderDetail)
                listBuffer.append(orderWide)
              }
            }
          } else {
            //2. OrderInfo没有,orderDetail有
            val orderDetail: OrderDetail = orderDetailOp.get
            //2.1 OrderDetail查缓存中的OrderInfo
            val orderInfoRedisKey: String = s"ORDERJOIN:ORDER_INFO:${orderDetail.order_id}"
            val orderInfoJson: String = jedis.get(orderInfoRedisKey)
            if (orderInfoJson != null) {
              val orderInfo: OrderInfo = JSON.parseObject(orderInfoJson, classOf[OrderInfo])
              val orderWide: OrderWide = new OrderWide(orderInfo, orderDetail)
              listBuffer.append(orderWide)
            } else {
              //2.2 orderDetail进缓存,因为ordetDetail与orderInfo是多对一，所以不是无条件进缓存
              // type:   set
              // key:    ORDERJOIN:ORDER_DETAIL:[orderId]
              // value:  json字符串集合
              //写入API: sadd
              //读取API: smembers
              //是否过期: 24小时
              //TODO 程序出问题，数据重发，可能导致的数据重复问题.
              //    下游的ES是幂等写入， 可以解决重复问题.
              val orderDetailRedisKey: String = s"ORDERJOIN:ORDER_DETAIL:${orderDetail.order_id}"
              jedis.sadd(orderDetailRedisKey, JSON.toJSONString(orderDetail, new SerializeConfig(true)))
              jedis.expire(orderDetailRedisKey, 3600 * 24)
            }

          }
        }
        MyRedisUtils.close(jedis)
        listBuffer.toIterator
      }
    )
    //6. 写入ES
    // 1. 索引规划
    //      索引分割: 一天一个索引
    //      索引别名
    //      索引模板: "index_patterns": ["gmall_order_wide*"],
    // 2. 写入方式
    //      批量幂等写入
    afterFullJoinDStream.foreachRDD(
      rdd =>{
        rdd.foreachPartition(
          orderWideIter => {
            val orderWidesList: List[(String, OrderWide)] = orderWideIter.map( orderWide => (orderWide.detail_id.toString , orderWide )).toList
            if(orderWidesList.size > 0  ){
              val orderwide: OrderWide = orderWidesList.head._2
              println("-----------")
              println(orderwide)
              val dt: String = orderwide.create_date
              val indexName : String = s"gmall_order_wide_$dt"
              MyEsUtils.bulkSave(indexName , orderWidesList)
            }
          }
        )
        //提交offset
        MyOffsetUtils.SaveOffset(orderInfoTopic , groupId , orderInfoOffsetRanges)
        MyOffsetUtils.SaveOffset(orderDetailTopic , groupId ,orderDetailOffsetRanges)
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }
}
